1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.net.client.client; 12 13 import std.socket; 14 15 import kiss.exception; 16 import kiss.event; 17 import kiss.util.timer; 18 import kiss.net.TcpStream; 19 import kiss.net.TcpStream; 20 import collie.net.client.linklogInfo; 21 import collie.net.client.exception; 22 import kiss.event.task; 23 24 @trusted abstract class BaseClient 25 { 26 alias ClientCreatorCallBack = void delegate(TcpStream); 27 alias LinklogInfo = TLinklogInfo!ClientCreatorCallBack; 28 29 this(EventLoop loop) 30 { 31 _loop = loop; 32 } 33 34 final bool isAlive() @trusted 35 { 36 return _logInfo.client && _logInfo.client.isRegistered; 37 } 38 39 final void setTimeout(uint s) @safe 40 { 41 _timeout = s; 42 } 43 44 @property tryCount(){return _tryCount;} 45 @property tryCount(uint count){_tryCount = count;} 46 47 final void connect(Address addr,ClientCreatorCallBack cback = null) @trusted 48 { 49 if(isAlive) 50 throw new SocketClientException("must set NewConnection callback "); 51 _logInfo.tryCount = 0; 52 _logInfo.cback = cback; 53 _logInfo.addr = addr; 54 _loop.postTask(newTask(&_postConnect)); 55 } 56 57 58 final void write(ubyte[] data, DataWrittenHandler cback = null) @trusted 59 { 60 write(new SocketStreamBuffer(data,cback)); 61 } 62 63 final void write(StreamWriteBuffer buffer) @trusted 64 { 65 if (_loop.isInLoopThread()) { 66 _postWriteBuffer(buffer); 67 } else { 68 _loop.postTask(newTask(&_postWriteBuffer, buffer)); 69 } 70 } 71 72 pragma(inline) 73 final void close() @trusted 74 { 75 if(_logInfo.client is null) return; 76 _loop.postTask(newTask(&_postClose)); 77 } 78 79 final @property TcpStream tcpStreamClient() @trusted {return _logInfo.client;} 80 final @property KissTimer timer() @trusted {return _timer;} 81 final @property uint timeout() @safe {return _timeout;} 82 final @property EventLoop eventLoop() @trusted {return _loop;} 83 protected: 84 void onActive() nothrow; 85 void onFailure() nothrow; 86 void onClose() nothrow; 87 void onRead(in ubyte[] data) nothrow; 88 void onTimeout(Object sender); 89 90 final startTimer() 91 { 92 if(_timeout == 0) 93 return; 94 if(_timer) 95 _timer.stop(); 96 else { 97 _timer = new KissTimer(_loop); 98 _timer.onTick(&onTimeout); 99 } 100 _timer.interval = _timeout * 1000; 101 _timer.start(); 102 } 103 private: 104 final void connect() 105 { 106 TcpStream stream = new TcpStream(_loop); 107 _logInfo.client = stream; 108 if(_logInfo.cback) 109 _logInfo.cback(stream); 110 stream.onConnected(&connectCallBack); 111 stream.onClosed(&doClose); 112 stream.onDataReceived(&onRead); 113 stream.connect(_logInfo.addr); 114 } 115 116 final void connectCallBack(bool state) nothrow{ 117 catchAndLogException((){ 118 if(state){ 119 _logInfo.cback = null; 120 onActive(); 121 } else { 122 _logInfo.client = null; 123 if(_logInfo.tryCount < _tryCount){ 124 _logInfo.tryCount ++; 125 connect(); 126 } else { 127 _logInfo.cback = null; 128 if(_timer) 129 _timer.stop(); 130 onFailure(); 131 } 132 } 133 }()); 134 135 } 136 final void doClose() nothrow 137 { 138 catchAndLogException((){ 139 if(_timer) 140 _timer.stop(); 141 // auto client = _logInfo.client; 142 _logInfo.client = null; 143 onClose(); 144 }()); 145 } 146 147 private: 148 final void _postClose(){ 149 if(_logInfo.client) 150 _logInfo.client.close(); 151 } 152 153 final void _postWriteBuffer(StreamWriteBuffer buffer) 154 { 155 if (_logInfo.client) { 156 _logInfo.client.write(buffer); 157 } else 158 buffer.doFinish(); 159 } 160 161 final void _postConnect(){ 162 startTimer(); 163 connect(); 164 } 165 166 private 167 EventLoop _loop; 168 LinklogInfo _logInfo; 169 uint _tryCount = 1; 170 KissTimer _timer; 171 uint _timeout; 172 } 173